package com.hao.chapter11;


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * TableAPI 和 SQL的使用
 *
 */
public class TableAPIAndSQLOpJava {
    public static void main(String[] args) {
        //获取TableEnvironment
        EnvironmentSettings sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment sTableEnv = TableEnvironment.create(sSettings);

        //创建输入表
        sTableEnv.executeSql("" +
                "create table myTable(\n" +
                "id int,\n" +
                "name string\n" +
                ") with (\n" +
                "'connector.type' = 'filesystem',\n" +
                "'connector.path' = 'D:\\data\\source.txt',\n" +
                "'format.type' = 'csv'\n" +
                ")");

        //使用Table API实现数据查询和过滤等操作

        /*Table result = sTableEnv.from("myTable")
                .select($("id"), $("name"))
                .filter($("id").isGreater(1));*/

        //使用SQL实现数据查询和过滤等操作
        Table result = sTableEnv.sqlQuery("select id,name from myTable where id > 1");

        //输出结果到控制台
        result.execute().print();

//        //创建输出表
//        sTableEnv.executeSql("" +
//                "create table newTable(\n" +
//                "id int,\n" +
//                "name string\n" +
//                ") with (\n" +
//                "'connector.type' = 'filesystem',\n" +
//                "'connector.path' = 'D:\\data\\res',\n" +
//                "'format.type' = 'csv'\n" +
//                ")");
//
//        //输出结果到表newTable中
//        result.executeInsert("newTable").print();
        sTableEnv.executeSql("" +
                "create table newTable(\n" +
                "id int,\n" +
                "name string\n" +
                ") with (\n" +
                "'connector.type' = 'jdbc',\n" +
                "'connector.url' = 'jdbc:mysql://127.0.0.1:3306/test_flink',\n" +
                "'connector.table' = 'sensor_count',\n" +
                "'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
                "'connector.username' = 'root',\n" +
                "'connector.password' = 'root'\n" +
                ")");
        result.executeInsert("newTable").print();
    }
}

